[[serdes]]
= Serialization, Deserialization, and Message Conversion

[[overview]]
== Overview

Apache Kafka provides a high-level API for serializing and deserializing record values as well as their keys.
It is present with the `org.apache.kafka.common.serialization.Serializer<T>` and
`org.apache.kafka.common.serialization.Deserializer<T>` abstractions with some built-in implementations.
Meanwhile, we can specify serializer and deserializer classes by using `Producer` or `Consumer` configuration properties.
The following example shows how to do so:

[source, java]
----
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
----

For more complex or particular cases, the `KafkaConsumer` (and, therefore, `KafkaProducer`) provides overloaded
constructors to accept `Serializer` and `Deserializer` instances for `keys` and `values`, respectively.

When you use this API, the `DefaultKafkaProducerFactory` and `DefaultKafkaConsumerFactory` also provide properties (through constructors or setter methods) to inject custom `Serializer` and `Deserializer` instances into the target `Producer` or `Consumer`.
Also, you can pass in `Supplier<Serializer>` or `Supplier<Deserializer>` instances through constructors - these `Supplier`+++s+++ are called on creation of each `Producer` or `Consumer`.

[[string-serde]]
== String serialization

Since version 2.5, Spring for Apache Kafka provides `ToStringSerializer` and `ParseStringDeserializer` classes that use String representation of entities.
They rely on methods `toString` and some `Function<String>` or `BiFunction<String, Headers>` to parse the String and populate properties of an instance.
Usually, this would invoke some static method on the class, such as `parse`:

[source, java]
----
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
----

By default, the `ToStringSerializer` is configured to convey type information about the serialized entity in the record `Headers`.
You can disable this by setting the `addTypeInfo` property to `false`.
This information can be used by `ParseStringDeserializer` on the receiving side.

* `ToStringSerializer.ADD_TYPE_INFO_HEADERS` (default `true`): You can set it to `false` to disable this feature on the `ToStringSerializer` (sets the `addTypeInfo` property).

[source, java]
----
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
    byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
    String entityType = new String(header);

    if (entityType.contains("Thing")) {
        return Thing.parse(str);
    }
    else {
        // ...parsing logic
    }
});
----

You can configure the `Charset` used to convert `String` to/from `byte[]` with the default being `UTF-8`.

You can configure the deserializer with the name of the parser method using `ConsumerConfig` properties:

* `ParseStringDeserializer.KEY_PARSER`
* `ParseStringDeserializer.VALUE_PARSER`

The properties must contain the fully qualified name of the class followed by the method name, separated by a period `.`.
The method must be static and have a signature of either `(String, Headers)` or `(String)`.

A `ToFromStringSerde` is also provided, for use with Kafka Streams.

[[json-serde]]
== JSON

Spring for Apache Kafka also provides `JsonSerializer` and `JsonDeserializer` implementations that are based on the
Jackson JSON object mapper.
The `JsonSerializer` allows writing any Java object as a JSON `byte[]`.
The `JsonDeserializer` requires an additional `Class<?> targetType` argument to allow the deserialization of a consumed `byte[]` to the proper target object.
The following example shows how to create a `JsonDeserializer`:

[source, java]
----
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
----

You can customize both `JsonSerializer` and `JsonDeserializer` with an `ObjectMapper`.
You can also extend them to implement some particular configuration logic in the `configure(Map<String, ?> configs, boolean isKey)` method.

Starting with version 2.3, all the JSON-aware components are configured by default with a `JacksonUtils.enhancedObjectMapper()` instance, which comes with the `MapperFeature.DEFAULT_VIEW_INCLUSION` and `DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES` features disabled.
Also such an instance is supplied with well-known modules for custom data types, such a Java time and Kotlin support.
See `JacksonUtils.enhancedObjectMapper()` JavaDocs for more information.
This method also registers a `org.springframework.kafka.support.JacksonMimeTypeModule` for `org.springframework.util.MimeType` objects serialization into the plain string for inter-platform compatibility over the network.
A `JacksonMimeTypeModule` can be registered as a bean in the application context and it will be auto-configured into the https://docs.spring.io/spring-boot/docs/current/reference/html/howto.html#howto.spring-mvc.customize-jackson-objectmapper[Spring Boot `ObjectMapper` instance].

Also starting with version 2.3, the `JsonDeserializer` provides `TypeReference`-based constructors for better handling of target generic container types.

Starting with version 2.1, you can convey type information in record `Headers`, allowing the handling of multiple types.
In addition, you can configure the serializer and deserializer by using the following Kafka properties.
They have no effect if you have provided `Serializer` and `Deserializer` instances for `KafkaConsumer` and `KafkaProducer`, respectively.

[[serdes-json-config]]
=== Configuration Properties

* `JsonSerializer.ADD_TYPE_INFO_HEADERS` (default `true`): You can set it to `false` to disable this feature on the `JsonSerializer` (sets the `addTypeInfo` property).
* `JsonSerializer.TYPE_MAPPINGS` (default `empty`): See xref:kafka/serdes.adoc#serdes-mapping-types[Mapping Types].
* `JsonDeserializer.USE_TYPE_INFO_HEADERS` (default `true`): You can set it to `false` to ignore headers set by the serializer.
* `JsonDeserializer.REMOVE_TYPE_INFO_HEADERS` (default `true`): You can set it to `false` to retain headers set by the serializer.
* `JsonDeserializer.KEY_DEFAULT_TYPE`: Fallback type for deserialization of keys if no header information is present.
* `JsonDeserializer.VALUE_DEFAULT_TYPE`: Fallback type for deserialization of values if no header information is present.
* `JsonDeserializer.TRUSTED_PACKAGES` (default `java.util`, `java.lang`): Comma-delimited list of package patterns allowed for deserialization.
`*` means deserializing all.
* `JsonDeserializer.TYPE_MAPPINGS` (default `empty`): See xref:kafka/serdes.adoc#serdes-mapping-types[Mapping Types].
* `JsonDeserializer.KEY_TYPE_METHOD` (default `empty`): See xref:kafka/serdes.adoc#serdes-type-methods[Using Methods to Determine Types].
* `JsonDeserializer.VALUE_TYPE_METHOD` (default `empty`): See xref:kafka/serdes.adoc#serdes-type-methods[Using Methods to Determine Types].

Starting with version 2.2, the type information headers (if added by the serializer) are removed by the deserializer.
You can revert to the previous behavior by setting the `removeTypeHeaders` property to `false`, either directly on the deserializer or with the configuration property described earlier.

See also xref:tips.adoc#tip-json[Customizing the JsonSerializer and JsonDeserializer].

IMPORTANT: Starting with version 2.8, if you construct the serializer or deserializer programmatically as shown in xref:kafka/serdes.adoc#prog-json[Programmatic Construction], the above properties will be applied by the factories, as long as you have not set any properties explicitly (using `set*()` methods or using the fluent API).
Previously, when creating programmatically, the configuration properties were never applied; this is still the case if you explicitly set properties on the object directly.

[[serdes-mapping-types]]
=== Mapping Types

Starting with version 2.2, when using JSON, you can now provide type mappings by using the properties in the preceding list.
Previously, you had to customize the type mapper within the serializer and deserializer.
Mappings consist of a comma-delimited list of `token:className` pairs.
On outbound, the payload's class name is mapped to the corresponding token.
On inbound, the token in the type header is mapped to the corresponding class name.

The following example creates a set of mappings:

[source, java]
----
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.Hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.Hat");
----

IMPORTANT: The corresponding objects must be compatible.

If you use https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka[Spring Boot], you can provide these properties in the `application.properties` (or yaml) file.
The following example shows how to do so:

[source]
----
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
----


[IMPORTANT]
====
You can perform only simple configuration with properties.
For more advanced configuration (such as using a custom `ObjectMapper` in the serializer and deserializer), you should use the producer and consumer factory constructors that accept a pre-built serializer and deserializer.
The following Spring Boot example overrides the default factories:

=====
[source, java]
----
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(JsonDeserializer customValueDeserializer) {
    Map<String, Object> properties = new HashMap<>();
    // properties.put(..., ...)
    // ...
    return new DefaultKafkaConsumerFactory<>(properties,
        new StringDeserializer(), customValueDeserializer);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(JsonSerializer customValueSerializer) {
    return new DefaultKafkaProducerFactory<>(properties.buildProducerProperties(),
        new StringSerializer(), customValueSerializer);
}
----
=====

Setters are also provided, as an alternative to using these constructors.
====

NOTE: When using Spring Boot and overriding the `ConsumerFactory` and `ProducerFactory` as shown above, wild card generic types need to be used with the bean method return type.
If concrete generic types are provided instead, then Spring Boot will ignore these beans and still use the default ones.

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean `useHeadersIfPresent` argument (which is `true` by default).
The following example shows how to do so:

[source, java]
----
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
        new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
----

[[serdes-type-methods]]
=== Using Methods to Determine Types

Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type.
If present, this will override any of the other techniques discussed above.
This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers.
Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period `.`.
The method must be declared as `public static`, have one of three signatures `(String topic, byte[] data, Headers headers)`, `(byte[] data, Headers headers)` or `(byte[] data)` and return a Jackson `JavaType`.

* `JsonDeserializer.KEY_TYPE_METHOD` : `spring.json.key.type.method`
* `JsonDeserializer.VALUE_TYPE_METHOD` : `spring.json.value.type.method`

You can use arbitrary headers or inspect the data to determine the type.

[source, java]
----
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);

JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);

public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
    // {"thisIsAFieldInThing1":"value", ...
    if (data[21] == '1') {
        return thing1Type;
    }
    else {
        return thing2Type;
    }
}
----

For more sophisticated data inspection consider using `JsonPath` or similar but, the simpler the test to determine the type, the more efficient the process will be.

The following is an example of creating the deserializer programmatically (when providing the consumer factory with the deserializer in the constructor):

[source, java]
----
JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);

...

public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
    ...
}
----

[[prog-json]]
=== Programmatic Construction

When constructing the serializer/deserializer programmatically for use in the producer/consumer factory, since version 2.3, you can use the fluent API, which simplifies configuration.

[source, java]
----
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
        new JsonSerializer<MyKeyType>()
            .forKeys()
            .noTypeInfo(),
        new JsonSerializer<MyValueType>()
            .noTypeInfo());
    return pf;
}

@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
    Map<String, Object> props = new HashMap<>();
    // props.put(..., ...)
    // ...
    DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
        new JsonDeserializer<>(MyKeyType.class)
            .forKeys()
            .ignoreTypeHeaders(),
        new JsonDeserializer<>(MyValueType.class)
            .ignoreTypeHeaders());
    return cf;
}
----

To provide type mapping programmatically, similar to xref:kafka/serdes.adoc#serdes-type-methods[Using Methods to Determine Types], use the `typeFunction` property.

[source, java]
----
JsonDeserializer<Object> deser = new JsonDeserializer<>()
        .trustedPackages("*")
        .typeFunction(MyUtils::thingOneOrThingTwo);
----

Alternatively, as long as you don't use the fluent API to configure properties, or set them using `set*()` methods, the factories will configure the serializer/deserializer using the configuration properties; see xref:kafka/serdes.adoc#serdes-json-config[Configuration Properties].

[[delegating-serialization]]
== Delegating Serializer and Deserializer

[[using-headers]]
=== Using Headers

Version 2.3 introduced the `DelegatingSerializer` and `DelegatingDeserializer`, which allow producing and consuming records with different key and/or value types.
Producers must set a header `DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR` to a selector value that is used to select which serializer to use for the value and `DelegatingSerializer.KEY_SERIALIZATION_SELECTOR` for the key; if a match is not found, an `IllegalStateException` is thrown.

For incoming records, the deserializer uses the same headers to select the deserializer to use; if a match is not found or the header is not present, the raw `byte[]` is returned.

You can configure the map of selector to `Serializer` / `Deserializer` via a constructor, or you can configure it via Kafka producer/consumer properties with the keys `DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG` and `DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG`.
For the serializer, the producer property can be a `Map<String, Object>` where the key is the selector and the value is a `Serializer` instance, a serializer `Class` or the class name.
The property can also be a String of comma-delimited map entries, as shown below.

For the deserializer, the consumer property can be a `Map<String, Object>` where the key is the selector and the value is a `Deserializer` instance, a deserializer `Class` or the class name.
The property can also be a String of comma-delimited map entries, as shown below.

To configure using properties, use the following syntax:

[source, java]
----
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")

consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
    "thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
----

Producers would then set the `DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR` header to `thing1` or `thing2`.

This technique supports sending different types to the same topic (or different topics).

NOTE: Starting with version 2.5.1, it is not necessary to set the selector header, if the type (key or value) is one of the standard types supported by `Serdes` (`Long`, `Integer`, etc).
Instead, the serializer will set the header to the class name of the type.
It is not necessary to configure serializers or deserializers for these types, they will be created (once) dynamically.

For another technique to send different types to different topics, see xref:kafka/sending-messages.adoc#routing-template[Using `RoutingKafkaTemplate`].

[[by-type]]
=== By Type

Version 2.8 introduced the `DelegatingByTypeSerializer`.

[source, java]
----
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null, new DelegatingByTypeSerializer(Map.of(
                    byte[].class, new ByteArraySerializer(),
                    Bytes.class, new BytesSerializer(),
                    String.class, new StringSerializer())));
}
----

Starting with version 2.8.3, you can configure the serializer to check if the map key is assignable from the target object, useful when a delegate serializer can serialize sub classes.
In this case, if there are amiguous matches, an ordered `Map`, such as a `LinkedHashMap` should be provided.

[[by-topic]]
=== By Topic

Starting with version 2.8, the `DelegatingByTopicSerializer` and `DelegatingByTopicDeserializer` allow selection of a serializer/deserializer based on the topic name.
Regex `Pattern`+++s+++ are used to lookup the instance to use.
The map can be configured using a constructor, or via properties (a comma delimited list of `pattern:serializer`).

[source, java]
----
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArraySerializer.class.getName()
        + ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
            "topic[0-4]:" + ByteArrayDeserializer.class.getName()
        + ", topic[5-9]:" + StringDeserializer.class.getName());
----

Use `KEY_SERIALIZATION_TOPIC_CONFIG` when using this for keys.

[source, java]
----
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            new IntegerSerializer(),
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}
----

You can specify a default serializer/deserializer to use when there is no pattern match using `DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT` and `DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT`.

An additional property `DelegatingByTopicSerialization.CASE_SENSITIVE` (default `true`), when set to `false` makes the topic lookup case insensitive.

[[retrying-deserialization]]
== Retrying Deserializer

The `RetryingDeserializer` uses a delegate `Deserializer` and `RetryTemplate` to retry deserialization when the delegate might have transient errors, such as network issues, during deserialization.

[source, java]
----
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
    new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
    new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
----

Starting with version `3.1.2`, a `RecoveryCallback` can be set on the `RetryingDeserializer` optionally.

Refer to the https://github.com/spring-projects/spring-retry[spring-retry] project for configuration of the `RetryTemplate` with a retry policy, back off policy, etc.


[[messaging-message-conversion]]
== Spring Messaging Message Conversion

Although the `Serializer` and `Deserializer` API is quite simple and flexible from the low-level Kafka `Consumer` and `Producer` perspective, you might need more flexibility at the Spring Messaging level, when using either `@KafkaListener` or https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka[Spring Integration's Apache Kafka Support].
To let you easily convert to and from `org.springframework.messaging.Message`, Spring for Apache Kafka provides a `MessageConverter` abstraction with the `MessagingMessageConverter` implementation and its `JsonMessageConverter` (and subclasses) customization.
You can inject the `MessageConverter` into a `KafkaTemplate` instance directly and by using `AbstractKafkaListenerContainerFactory` bean definition for the `@KafkaListener.containerFactory()` property.
The following example shows how to do so:

[source, java]
----
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordMessageConverter(new JsonMessageConverter());
    return factory;
}
...
@KafkaListener(topics = "jsonData",
                containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
----

When using Spring Boot, simply define the converter as a `@Bean` and Spring Boot auto configuration will wire it into the auto-configured template and container factory.

When you use a `@KafkaListener`, the parameter type is provided to the message converter to assist with the conversion.

[NOTE]
====
This type inference can be achieved only when the `@KafkaListener` annotation is declared at the method level.
With a class-level `@KafkaListener`, the payload type is used to select which `@KafkaHandler` method to invoke, so it must already have been converted before the method can be chosen.
====

[NOTE]
====
On the consumer side, you can configure a `JsonMessageConverter`; it can handle `ConsumerRecord` values of type `byte[]`, `Bytes` and `String` so should be used in conjunction with a `ByteArrayDeserializer`, `BytesDeserializer` or `StringDeserializer`.
(`byte[]` and `Bytes` are more efficient because they avoid an unnecessary `byte[]` to `String` conversion).
You can also configure the specific subclass of `JsonMessageConverter` corresponding to the deserializer, if you so wish.

On the producer side, when you use Spring Integration or the `KafkaTemplate.send(Message<?> message)` method (see xref:kafka/sending-messages.adoc#kafka-template[Using `KafkaTemplate`]), you must configure a message converter that is compatible with the configured Kafka `Serializer`.

* `StringJsonMessageConverter` with `StringSerializer`
* `BytesJsonMessageConverter` with `BytesSerializer`
* `ByteArrayJsonMessageConverter` with `ByteArraySerializer`

Again, using `byte[]` or `Bytes` is more efficient because they avoid a `String` to `byte[]` conversion.

For convenience, starting with version 2.3, the framework also provides a `StringOrBytesSerializer` which can serialize all three value types so it can be used with any of the message converters.
====

Starting with version 2.7.1, message payload conversion can be delegated to a `spring-messaging` `SmartMessageConverter`; this enables conversion, for example, to be based on the `MessageHeaders.CONTENT_TYPE` header.

IMPORTANT:    The `KafkaMessageConverter.fromMessage()` method is called for outbound conversion to a `ProducerRecord` with the message payload in the `ProducerRecord.value()` property.
The `KafkaMessageConverter.toMessage()` method is called for inbound conversion from `ConsumerRecord` with the payload being the `ConsumerRecord.value()` property.
The `SmartMessageConverter.toMessage()` method is called to create a new outbound `Message<?>` from the `Message` passed to `fromMessage()` (usually by `KafkaTemplate.send(Message<?> msg)`).
Similarly, in the `KafkaMessageConverter.toMessage()` method, after the converter has created a new `Message<?>` from the `ConsumerRecord`, the `SmartMessageConverter.fromMessage()` method is called and then the final inbound message is created with the newly converted payload.
In either case, if the `SmartMessageConverter` returns `null`, the original message is used.

When the default converter is used in the `KafkaTemplate` and listener container factory, you configure the `SmartMessageConverter` by calling `setMessagingConverter()` on the template and via the `contentTypeConverter` property on `@KafkaListener` methods.

Examples:

[source, java]
----
template.setMessagingConverter(mySmartConverter);
----

[source, java]
----
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
    contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
    ...
}
----

[[data-projection]]
=== Using Spring Data Projection Interfaces

Starting with version 2.1.1, you can convert JSON to a Spring Data Projection interface instead of a concrete type.
This allows very selective, and low-coupled bindings to data, including the lookup of values from multiple places inside the JSON document.
For example the following interface can be defined as message payload type:

[source, java]
----
interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
----

[source, java]
----
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}
----

Accessor methods will be used to lookup the property name as field in the received JSON document by default.
The `@JsonPath` expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value.

To enable this feature, use a `ProjectingMessageConverter` configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces).
You must also add `spring-data:spring-data-commons` and `com.jayway.jsonpath:json-path` to the classpath.

When used as the parameter to a `@KafkaListener` method, the interface type is automatically passed to the converter as normal.

[[error-handling-deserializer]]
== Using `ErrorHandlingDeserializer`

When a deserializer fails to deserialize a message, Spring has no way to handle the problem, because it occurs before the `poll()` returns.
To solve this problem, the `ErrorHandlingDeserializer` has been introduced.
This deserializer delegates to a real deserializer (key or value).
If the delegate fails to deserialize the record content, the `ErrorHandlingDeserializer` returns a `null` value and a `DeserializationException` in a header that contains the cause and the raw bytes.
When you use a record-level `MessageListener`, if the `ConsumerRecord` contains a `DeserializationException` header for either the key or value, the container's `ErrorHandler` is called with the failed `ConsumerRecord`.
The record is not passed to the listener.

Alternatively, you can configure the `ErrorHandlingDeserializer` to create a custom value by providing a `failedDeserializationFunction`, which is a `Function<FailedDeserializationInfo, T>`.
This function is invoked to create an instance of `T`, which is passed to the listener in the usual fashion.
An object of type `FailedDeserializationInfo`, which contains all the contextual information is provided to the function.
You can find the `DeserializationException` (as a serialized Java object) in headers.
See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer.html[Javadoc] for the `ErrorHandlingDeserializer` for more information.

You can use the `DefaultKafkaConsumerFactory` constructor that takes key and value `Deserializer` objects and wire in appropriate `ErrorHandlingDeserializer` instances that you have configured with the proper delegates.
Alternatively, you can use consumer configuration properties (which are used by the `ErrorHandlingDeserializer`) to instantiate the delegates.
The property names are `ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS` and `ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS`.
The property value can be a class or class name.
The following example shows how to set these properties:

[source, java]
----
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
----

The following example uses a `failedDeserializationFunction`.

[source, java]
----
public class BadThing extends Thing {

  private final FailedDeserializationInfo failedDeserializationInfo;

  public BadThing(FailedDeserializationInfo failedDeserializationInfo) {
    this.failedDeserializationInfo = failedDeserializationInfo;
  }

  public FailedDeserializationInfo getFailedDeserializationInfo() {
    return this.failedDeserializationInfo;
  }

}

public class FailedThingProvider implements Function<FailedDeserializationInfo, Thing> {

  @Override
  public Thing apply(FailedDeserializationInfo info) {
    return new BadThing(info);
  }

}
----

The preceding example uses the following configuration:

[source, java]
----
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedThingProvider.class);
...
----

IMPORTANT: If the consumer is configured with an `ErrorHandlingDeserializer`, it is important to configure the `KafkaTemplate` and its producer with a serializer that can handle normal objects as well as raw `byte[]` values, which result from deserialization exceptions.
The generic value type of the template should be `Object`.
One technique is to use the `DelegatingByTypeSerializer`; an example follows:

[source, java]
----
@Bean
public ProducerFactory<String, Object> producerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
    new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
          MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}
----
When using an `ErrorHandlingDeserializer` with a batch listener, you must check for the deserialization exceptions in message headers.
When used with a `DefaultBatchErrorHandler`, you can use that header to determine which record the exception failed on and communicate to the error handler via a `BatchListenerFailedException`.

[source, java]
----
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
    for (int i = 0; i < in.size(); i++) {
        Thing thing = in.get(i);
        if (thing == null
                && headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
            try {
                DeserializationException deserEx = SerializationUtils.byteArrayToDeserializationException(this.logger,
                        headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
                if (deserEx != null) {
                    logger.error(deserEx, "Record at index " + i + " could not be deserialized");
                }
            }
            catch (Exception ex) {
                logger.error(ex, "Record at index " + i + " could not be deserialized");
            }
            throw new BatchListenerFailedException("Deserialization", deserEx, i);
        }
        process(thing);
    }
}
----

`SerializationUtils.byteArrayToDeserializationException()` can be used to convert the header to a `DeserializationException`.

When consuming `List<ConsumerRecord<?, ?>`, `SerializationUtils.getExceptionFromHeader()` is used instead:

[source, java]
----
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
    for (int i = 0; i < in.size(); i++) {
        ConsumerRecord<String, Thing> rec = in.get(i);
        if (rec.value() == null) {
            DeserializationException deserEx = SerializationUtils.getExceptionFromHeader(rec,
                    SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
            if (deserEx != null) {
                logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
                throw new BatchListenerFailedException("Deserialization", deserEx, i);
            }
        }
        process(rec.value());
    }
}
----

IMPORTANT: If you are also using a `DeadLetterPublishingRecoverer`, the record published for a `DeserializationException` will have a `record.value()` of type `byte[]`; this should not be serialized.
Consider using a `DelegatingByTypeSerializer` configured to use a `ByteArraySerializer` for `byte[]` and the normal serializer (Json, Avro, etc) for all other types.

Starting with version 3.1, you can add a `Validator` to the `ErrorHandlingDeserializer`.
If the delegate `Deserializer` successfully deserializes the object, but that object fails validation, an exception is thrown similar to a deserialization exception occurring.
This allows the original raw data to be passed to the error handler.
When creating the deserializer yourself, simply call `setValidator`; if you configure the serializer using properties, set the consumer configuration property `ErrorHandlingDeserializer.VALIDATOR_CLASS` to the class or fully qualified class name for your `Validator`.
When using Spring Boot, this property name is `spring.kafka.consumer.properties.spring.deserializer.validator.class`.

[[payload-conversion-with-batch]]
== Payload Conversion with Batch Listeners

You can also use a `JsonMessageConverter` within a `BatchMessagingMessageConverter` to convert batch messages when you use a batch listener container factory.
See xref:kafka/serdes.adoc[Serialization, Deserialization, and Message Conversion] and xref:kafka/serdes.adoc#messaging-message-conversion[Spring Messaging Message Conversion] for more information.

By default, the type for the conversion is inferred from the listener argument.
If you configure the `JsonMessageConverter` with a `DefaultJackson2TypeMapper` that has its `TypePrecedence` set to `TYPE_ID` (instead of the default `INFERRED`), the converter uses the type information in headers (if present) instead.
This allows, for example, listener methods to be declared with interfaces instead of concrete classes.
Also, the type converter supports mapping, so the deserialization can be to a different type than the source (as long as the data is compatible).
This is also useful when you use xref:kafka/receiving-messages/class-level-kafkalistener.adoc[class-level `@KafkaListener` instances] where the payload must have already been converted to determine which method to invoke.
The following example creates beans that use this method:

[source, java]
----
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.setBatchMessageConverter(new BatchMessagingMessageConverter(converter()));
    return factory;
}

@Bean
public JsonMessageConverter converter() {
    return new JsonMessageConverter();
}
----

Note that, for this to work, the method signature for the conversion target must be a container object with a single generic parameter type, such as the following:

[source, java]
----
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}
----

Note that you can still access the batch headers.

If the batch converter has a record converter that supports it, you can also receive a list of messages where the payloads are converted according to the generic type.
The following example shows how to do so:

[source, java]
----
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen(List<Message<Foo>> fooMessages) {
    ...
}
----

[[conversionservice-customization]]
== `ConversionService` Customization

Starting with version 2.1.1, the `org.springframework.core.convert.ConversionService` used by the default `org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory` to resolve parameters for the invocation of a listener method is supplied with all beans that implement any of the following interfaces:

* `org.springframework.core.convert.converter.Converter`
* `org.springframework.core.convert.converter.GenericConverter`
* `org.springframework.format.Formatter`

This lets you further customize listener deserialization without changing the default configuration for  `ConsumerFactory` and `KafkaListenerContainerFactory`.

IMPORTANT: Setting a custom `MessageHandlerMethodFactory` on the `KafkaListenerEndpointRegistrar` through a `KafkaListenerConfigurer` bean disables this feature.

[[custom-arg-resolve]]
== Adding Custom `HandlerMethodArgumentResolver` to `@KafkaListener`

Starting with version 2.4.2 you are able to add your own `HandlerMethodArgumentResolver` and resolve custom method parameters.
All you need is to implement `KafkaListenerConfigurer` and use method `setCustomMethodArgumentResolvers()` from class `KafkaListenerEndpointRegistrar`.

[source, java]
----
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        registrar.setCustomMethodArgumentResolvers(
            new HandlerMethodArgumentResolver() {

                @Override
                public boolean supportsParameter(MethodParameter parameter) {
                    return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
                }

                @Override
                public Object resolveArgument(MethodParameter parameter, Message<?> message) {
                    return new CustomMethodArgument(
                        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
                    );
                }
            }
        );
    }

}
----

You can also completely replace the framework's argument resolution by adding a custom `MessageHandlerMethodFactory` to the `KafkaListenerEndpointRegistrar` bean.
If you do this, and your application needs to handle tombstone records, with a `null` `value()` (e.g. from a compacted topic), you should add a `KafkaNullAwarePayloadArgumentResolver` to the factory; it must be the last resolver because it supports all types and can match arguments without a `@Payload` annotation.
If you are using a `DefaultMessageHandlerMethodFactory`, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard `PayloadMethodArgumentResolver`, which has no knowledge of `KafkaNull` payloads.

See also xref:kafka/tombstones.adoc[Null Payloads and Log Compaction of `Tombstone` Records].

